package com.gitee.aleenjava.aliyun.mqtt.service.subscribe;

import com.gitee.aleenjava.aliyun.mqtt.annotation.MqttSubscriber;
import com.gitee.aleenjava.aliyun.mqtt.callback.MessageMqttCallback;
import com.gitee.aleenjava.aliyun.mqtt.config.MqttSubscriberProperties;
import com.gitee.aleenjava.aliyun.mqtt.enums.QosEnum;
import com.gitee.aleenjava.aliyun.mqtt.model.ApplicationContextHelper;
import com.gitee.aleenjava.aliyun.mqtt.model.SubscriberInfo;
import com.gitee.aleenjava.aliyun.mqtt.processor.MqttSubscribeProcessor;
import com.gitee.aleenjava.aliyun.mqtt.register.MqttSubscriberRegister;
import com.gitee.aleenjava.aliyun.mqtt.tools.Tools;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.util.Assert;

import java.util.List;

/**
 * 消息订阅者服务
 *
 * @author Administrator
 */
@Slf4j
public class MqttSubscribeClientService {

    private MqttClient mqttClient;

    private final MqttSubscriberProperties mqttSubscriberProperties;

    private final MqttConnectOptions mqttConnectOptions;


    private final MqttCallback mqttCallback;

    private final List<SubscriberInfo> subscriberInfoList;

    private final String domainUrl;

    public MqttSubscribeClientService(MqttSubscriberProperties mqttSubscriberProperties, MqttSubscriberRegister mqttSubscriberRegister) {
        this.mqttSubscriberProperties = mqttSubscriberProperties;
        domainUrl =  String.format("tcp://%s:%s",mqttSubscriberProperties.getIp(),mqttSubscriberProperties.getPort());
        subscriberInfoList = mqttSubscriberRegister.getSubscriberInfos();
        mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setConnectionTimeout(mqttSubscriberProperties.getConnectionTimeout());
        mqttConnectOptions.setUserName("Signature|" + mqttSubscriberProperties.getAccessKey() + "|" + mqttSubscriberProperties.getInstanceId());
        mqttConnectOptions.setPassword(Tools.macSignature(mqttSubscriberProperties.getClientId(), mqttSubscriberProperties.getSecretKey()).toCharArray());
        mqttConnectOptions.setKeepAliveInterval(20);
        mqttConnectOptions.setMaxInflight(1000);
        mqttConnectOptions.setCleanSession(mqttSubscriberProperties.isEnableCleanSession());
        mqttCallback = new MessageMqttCallback(domainUrl, mqttConnectOptions, mqttClient, mqttSubscriberProperties, subscriberInfoList);
    }

    private void init() throws Exception {
        Assert.notEmpty(subscriberInfoList, "No subscribers detected, not initializing mqtt subscribe client");
        try {
            mqttClient = new MqttClient(domainUrl, mqttSubscriberProperties.getClientId(), new MemoryPersistence());
            mqttClient.setCallback(mqttCallback);
            mqttClient.connect(mqttConnectOptions);
            log.info("mqtt subscribeClient connection successful, ip:{}, port:{}, clientId:{}", mqttSubscriberProperties.getIp(), mqttSubscriberProperties.getPort(), mqttSubscriberProperties.getClientId());
            for (SubscriberInfo subscriberInfo : subscriberInfoList) {
                String topic = subscriberInfo.getTopic();
                int qos = subscriberInfo.getQos().getQos();
                mqttClient.subscribe(topic, qos);
                log.info("mqtt subscribe success topic:{},qos:{}", topic, qos);
            }
        } catch (Exception e) {
            log.error("mqtt subscribe error,message", e);
            throw e;
        }
    }
}
